package com.danan.data_collector.util;

import com.danan.data_collector.deserialization.MyCdcDeserialization;
import com.danan.data_collector.deserialization.MyOracleCdcDeserialization;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/28/10:20
 * @Description:
 */
public class DataStreamUtil {
    
    private static final String hostname;
    private static final int port;
    private static final String database;
    private static final String username;
    private static final String password;

    private static final String cdcModel;

    static {
        hostname = ConfigUtil.getProperty("data.source.hostname");
        port = Integer.parseInt(ConfigUtil.getProperty("data.source.port"));
        database = ConfigUtil.getProperty("data.source.database");
        username = ConfigUtil.getProperty("data.source.username");
        password = ConfigUtil.getProperty("data.source.password");
        cdcModel = ConfigUtil.getProperty("environment.cdc.model");
    }

    public static DataStreamSource<String> getMySQLDataStream(StreamExecutionEnvironment env,String schema) {
        MySqlSource<String> mySqlSource = null;
        switch (cdcModel) {
            case "initial":
                mySqlSource = MySqlSource.<String>builder()
                        .hostname(hostname)
                        .port(port)
                        .databaseList(database)
                        .tableList(schema + ".*")
                        .username(username)
                        .password(password)
                        .startupOptions(StartupOptions.earliest())
                        .deserializer(new MyCdcDeserialization())
                        .build();
                break;
            case "latest":
                mySqlSource = MySqlSource.<String>builder()
                        .hostname(hostname)
                        .port(port)
                        .databaseList(database)
                        .tableList(schema + ".*")
                        .username(username)
                        .password(password)
                        .startupOptions(StartupOptions.latest())
                        .deserializer(new MyCdcDeserialization())
                        .build();
                break;
            default:
                throw new RuntimeException("The cdc model is not defined. Please select a cdc model from the list [ initial , latest ]");
        }
        return env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_cdc");
    }

    /**
     * @Description 获取oracle数据流
     * @Param []
     **/
    public static DataStreamSource<String> getOracleDataStream(StreamExecutionEnvironment env,String schema) {
        String url = String.format(
                "jdbc:oracle:thin:@//%s:%s/%s",
                hostname,
                port,
                database
        );

        Properties properties = new Properties();
//        properties.setProperty("log.mining.strategy", "online_catalog");
//        properties.setProperty("log.mining.continuous.mine", "true");

//        开启Blob和Clob类型数据采集，否则结果为null
        properties.setProperty("lob.enabled", "true");

//        使用服务名连接Oracle，必须加上此参数
        properties.setProperty("database.url", url);

//        CDB+PDB模式必须设置database.dbname和database.pdb.name两个属性:
//              1 database.dbname是CDB的名字
//              2 database.pdb.name是PDB的名字
        properties.setProperty("database.dbname", "orcl");
        properties.setProperty("database.pdb.name", database);

        DebeziumSourceFunction<String> oracleSource = null;

        switch (cdcModel) {
            case "initial":
                oracleSource = OracleSource.<String>builder()
                        .hostname(hostname)
                        .port(port)
                        .database(database)
                        .tableList(schema + ".*")
                        .username(username)
                        .password(password)
                        .deserializer(new MyOracleCdcDeserialization())
                        .debeziumProperties(properties)
                        .build();
                break;
            case "latest":
                oracleSource = OracleSource.<String>builder()
                        .hostname(hostname)
                        .port(port)
                        .database(database)
                        .tableList(schema + ".*")
                        .username(username)
                        .password(password)
                        .startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.latest())
                        .deserializer(new MyOracleCdcDeserialization())
                        .debeziumProperties(properties)
                        .build();
                break;
            default:
                throw new RuntimeException("The cdc model is not defined. Please select a cdc model from the list [ initial , latest ]");
        }
        return env.addSource(oracleSource);
    }

    /**
     * @Description 获取sqlserver数据流
     * @Param []
     **/
    public static DataStreamSource<String> getSqlServerDataStream(StreamExecutionEnvironment env,String schema) {
        DebeziumSourceFunction<String> sqlserverSource = null;
        switch (cdcModel) {
            case "initial":
                sqlserverSource = SqlServerSource.<String>builder()
                        .hostname(hostname)
                        .port(port)
                        .database(database)
                        .tableList(schema + ".*")
                        .username(username)
                        .password(password)
                        .deserializer(new MyOracleCdcDeserialization()) // converts SourceRecord to JSON String
                        .build();
                break;
            case "latest":
                sqlserverSource = SqlServerSource.<String>builder()
                        .hostname(hostname)
                        .port(port)
                        .database(database)
                        .tableList(schema + ".*")
                        .username(username)
                        .password(password)
                        .startupOptions(com.ververica.cdc.connectors.sqlserver.table.StartupOptions.latest())
                        .deserializer(new MyOracleCdcDeserialization()) // converts SourceRecord to JSON String
                        .build();
                break;
            default:
                throw new RuntimeException("The cdc model is not defined. Please select a cdc model from the list [ initial , latest ]");
        }
        return env.addSource(sqlserverSource);
    }
    
}
